{
  "metadata" : {
    "name" : "Clustering Genomes using Adam with LDA",
    "user_save_timestamp" : "1970-01-01T00:00:00.000Z",
    "auto_save_timestamp" : "1970-01-01T00:00:00.000Z",
    "language_info" : {
      "name" : "scala",
      "file_extension" : "scala",
      "codemirror_mode" : "text/x-scala"
    },
    "trusted" : true,
    "customLocalRepo" : null,
    "customRepos" : null,
    "customDeps" : null,
    "customImports" : null,
    "customArgs" : null,
    "customSparkConf" : null
  },
  "cells" : [ {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Set the local dependencies repository"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : ":local-repo /tmp/spark-notebook/repo",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### ADAM dependencies for the notebook"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : ":dp org.bdgenomics.adam % adam-core % 0.16.0\n- org.apache.hadoop % hadoop-client %   _\n- org.apache.spark  % spark-core    %   _\n- org.scala-lang    %     _         %   _\n- org.scoverage     %     _         %   _\n+ org.apache.spark  %  spark-mllib_2.10  % 1.3.1",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "Get the master hostname, define a function to build hdfs urls and reset the spark context with appropriate configuration"
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Get the master hostname and define a function to build hdfs urls"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "import sys.process._\nval master = (\"ec2-metadata --public-hostname\"!!).drop(\"public-hostname: \".size).mkString.trim\ndef hu(s:String) = s\"hdfs://$master:9010/temp/$s\"",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Set the AWS keys to get genotypes from S3"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val fs_s3_awsAccessKeyId      = sys.env.get(\"AWS_ACCESS_KEY_ID\").getOrElse(\"<hard-code-one>\")\nval fs_s3_awsSecretAccessKey  = sys.env.get(\"AWS_SECRET_ACCESS_KEY\").getOrElse(\"<hard-code-one>\")",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Reset the spark context with complete configuration"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "reset(lastChanges = _.set(\"spark.serializer\", \"org.apache.spark.serializer.KryoSerializer\")\n                     .set(\"spark.kryo.registrator\", \"org.bdgenomics.adam.serialization.ADAMKryoRegistrator\")\n                     .set(\"spark.kryoserializer.buffer.mb\", \"4\")\n                     .set(\"spark.kryo.referenceTracking\", \"true\")\n                     .setMaster(s\"spark://$master:7077\")\n                     .setAppName(\"ADAM 1000genomes\")\n                     .set(\"spark.executor.memory\", \"13g\")\n                     .set(\"fs.s3.awsAccessKeyId\", fs_s3_awsAccessKeyId)\n                     .set(\"fs.s3.awsSecretAccessKey\", fs_s3_awsSecretAccessKey)\n)",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Import ADAM and other classes"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "import org.apache.hadoop.fs.{FileSystem, Path}\n\n//import org.bdgenomics.adam.converters.{ VCFLine, VCFLineConverter, VCFLineParser }\nimport org.bdgenomics.formats.avro.{Genotype, FlatGenotype}\nimport org.bdgenomics.adam.models.VariantContext\nimport org.bdgenomics.adam.rdd.ADAMContext._\n//import org.bdgenomics.adam.rdd.variation.VariationContext._\nimport org.bdgenomics.adam.rdd.ADAMContext\n  \nimport org.apache.spark.rdd.RDD",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true
    },
    "cell_type" : "markdown",
    "source" : "### Genotypes\nCreate a Genotype RDD from an ADAM formatted 1000genomes chromosome (22 here) stored on s3"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val adamFileOnS3 = \"s3n://med-at-scale/1000genomes/ALL.chr22.integrated_phase1_v3.20101123.snps_indels_svs.genotypes.vcf.adam\"",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val gts:RDD[Genotype] = sparkContext.adamLoad(adamFileOnS3)",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "Make a copy on hdfs if required to speed up a bit..."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "gts.adamParquetSave(hu(\"chr22.adam\"))",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val gts:RDD[Genotype] = sparkContext.adamLoad(hu(\"chr22.adam\"))\ngts.cache()",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Filter variants with position in 16,000,000-17,000,000"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val start = 16000000\nval end   = 26000000\nval sampledGts = gts.filter(g => (g.getVariant.getStart >= start && g.getVariant.getEnd <= end) )",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "Save a local copy of the sample to speedup processing"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "sampledGts.adamParquetSave(hu(\"chr22-sample-10M.adam\"))",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val sampledGts:RDD[Genotype] = sparkContext.adamLoad(hu(\"chr22-sample-10M.adam\"))",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val rep = sampledGts.coalesce(16)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "rep.adamParquetSave(hu(\"chr22-sample-10M-rep.adam\"))",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true
    },
    "cell_type" : "markdown",
    "source" : "### Population: List of populations selected for analysis"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "// populations to select\nval pops = Set(\"GBR\", \"ASW\", \"CHB\")\nval popsList = pops.toList\nval popsIdx = popsList.zipWithIndex.toMap",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "We download on the locafilesystem the list of samples IDs and correspondig populations"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val panelFile = \"/vol0/data/ALL.panel\"",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "s\"wget ftp://ftp.1000genomes.ebi.ac.uk/vol1/ftp/release/20130502/integrated_call_samples_v3.20130502.ALL.panel -O ${panelFile}\"!!",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true
    },
    "cell_type" : "markdown",
    "source" : "### Panel"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "import scala.io.Source\ndef extract(filter: (String, String) => Boolean= (s, t) => true) = Source.fromFile(panelFile).getLines().map( line => {\n  val toks = line.split(\"\\t\").toList\n  toks(0) -> toks(1)\n}).toMap.filter( tup => filter(tup._1, tup._2) )\n  \n// panel extract from file, filtering by the 2 populations\ndef panel: Map[String,String] = \n  extract((sampleID: String, pop: String) => pops.contains(pop)) \n  \n// broadcast the panel \nval bPanel = sparkContext.broadcast(panel)",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Filter genotypes further on selected populations"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val finalGts = sampledGts.filter(g =>  bPanel.value.contains(g.getSampleId))",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true
    },
    "cell_type" : "markdown",
    "source" : "### Count samples"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val sampleCount = finalGts.map(_.getSampleId.toString.hashCode).distinct.count\ns\"#Samples: $sampleCount\"",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true
    },
    "cell_type" : "markdown",
    "source" : "### Ensuring completness"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "//utils\nimport scala.collection.JavaConverters._\nimport org.bdgenomics.formats.avro._\ndef variantId(g:Genotype):String = {\n  val name = g.getVariant.getContig.getContigName\n    val start = g.getVariant.getStart\n    val end = g.getVariant.getEnd\n    s\"$name:$start:$end\"\n}\ndef asDouble(g:Genotype):Double = g.getAlleles.asScala.count(_ != GenotypeAllele.Ref)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "import org.apache.spark.SparkContext._\n// A VARIANT SHOULD HAVE sampleCount GENOTYPES\nval variantsById = finalGts.keyBy(g => variantId(g).hashCode).groupByKey.cache\nval missingVariantsRDD = variantsById.filter { case (k, it) => it.size != sampleCount }.keys\n\n// IF SAVING LIST OF MISSING VARIANTS IS REQUIRED...\n//missingVariantsRDD.saveAsObjectFile(hu(\"/model/missing-variants\"))\n\n// could be broadcased as well...\nval missingVariants = missingVariantsRDD.collect().toSet",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val completeGts = finalGts.filter { g => ! (missingVariants contains variantId(g).hashCode) }",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Make dataframe"
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "##### Keep lists of samples and index"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "// list samples\nval sampleList = completeGts.map{ g => (g.getSampleId.toString) }.distinct.collect\nval sampleIdx = sampleList.zipWithIndex.map(tup => (tup._1, tup._2.toLong)).toMap\n// broadcast the samples list \nval bSampleIdx  = sparkContext.broadcast(sampleIdx)\nval bSampleList = sparkContext.broadcast(sampleList)",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "#### Minor allele frequency distribution"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val sampleToData: RDD[(Long, (Double, Int))] =\n    completeGts.map { g => \n        (bSampleIdx.value(g.getSampleId.toString), (asDouble(g), variantId(g).hashCode)) \n}\nval alleleFreqs = completeGts.keyBy(g => variantId(g).hashCode)\n           .mapValues( g => asDouble(g))\n           .mapValues{ d => \n                          if (d == 0) (2,0)\n                          else if (d == 1) (1,1)\n                          else (0,2)\n                         }\n           .aggregateByKey((0,0))\n                         ((t1: (Int, Int), t2: (Int, Int)) => (t1._1 + t2._1, t1._2 + t2._2),\n                          (t1: (Int, Int), t2: (Int, Int)) => (t1._1 + t2._1, t1._2 + t2._2)\n                         ) \n           .mapValues{ tup =>\n                     val freqAllele = tup._1 min tup._2\n                     1.0*freqAllele / (tup._1 + tup._2) \n                     }\n           .values\n           .keyBy(d => (d*50).toInt / 50.0)\n           .mapValues(x => 1)\n           .reduceByKey(_+_).collect.toList",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "#### Groups data by sample and encode genotypes as {0,1,2}"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val sampleToData: RDD[(Long, (Double, Int))] =\n    completeGts.map { g => \n        (bSampleIdx.value(g.getSampleId.toString), (asDouble(g), variantId(g).hashCode)) \n}\n\nval groupedSampleToData = sampleToData.groupByKey",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "#### Transform as RDD[Vector]"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "import org.apache.spark.mllib.linalg.{Vector=>MLVector, Vectors}\n\ndef makeSortedVector(gts: Iterable[(Double, Int)]): MLVector = {\n   val elements = gts.toArray.sortBy(_._2).map(_._1)\n   Vectors.dense( elements )\n}\n\nval dataPerSampleId:RDD[(Long, MLVector)] =\n    groupedSampleToData.mapValues { it =>\n        makeSortedVector(it)\n    }\ndataPerSampleId.cache()\nval dataFrame:RDD[MLVector] = dataPerSampleId.values.cache()",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "dataPerSampleId.first._2.size",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Train a 3 topics (clusters) Latent Dirichlet Allocation Model "
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "import org.apache.spark.mllib.clustering.{LDA,LDAModel,DistributedLDAModel}\nval model: LDAModel = new LDA().setK(3).run(dataPerSampleId)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "model.vocabSize",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "#### Get the topic0 word probabilities"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val normalizer = new org.apache.spark.mllib.feature.Normalizer()\nval topics = model.topicsMatrix\nval normTopics = (0 to 2).map { topicId =>\n   val topic0 = for (word <- Range(0, model.vocabSize)) yield topics(word, topicId)\n   val normtopic = normalizer.transform(Vectors.dense(topic0.toArray)).toArray\n   normtopic\n             }",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "normTopics.map{ normtopic =>\n               dataFrame.take(10)(7).toArray.zip(normtopic).map(tup => math.log(math.pow(tup._2, tup._1))).reduce(_+_)\n              }",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "normTopics(2)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val normalizer = new org.apache.spark.mllib.feature.Normalizer()\nval normtopic = normalizer.transform(Vectors.dense(topic0.toArray)).toArray",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "#### compute the sample likelihood given topic0"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val lk0 = dataFrame.first.toArray.zip(normtopic).map(tup => math.log(math.pow(tup._2, tup._1))).reduce(_+_)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "math.exp(lk0)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val topicDistr = model.asInstanceOf[DistributedLDAModel].topicDistributions",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "getSampleId(0)",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "#### Prediction for a sample is the topic with maximal topic probability"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val getSampleId: (Int) => String = (i: Int) => sampleList(i)\nval rdd = topicDistr.map{ doc =>\n    val sampleIdx = doc._1.toInt\n    val pred = doc._2.toArray.zipWithIndex.maxBy(_._1)._2                     \n    (sampleIdx, pred)\n}\nval predictions = rdd.collect.map{ case (sampleIdx, pred) =>\n  val sample = getSampleId(sampleIdx)              \n  (sample, (pred, panel.getOrElse(sample, \"\"))) \n}",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "#### Humm what does the maximal topic probability distribution looks like?"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "topicDistr.map{ doc =>\n         doc._2.toArray.max\n              }\n          .keyBy(d => (d*100).toInt / 100.0)\n          .mapValues(x => 1)\n          .reduceByKey(_+_).collect.toList",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "()",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "#### And the confusion Matrix"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val confMat = predictions.toMap.values\n    .groupBy(_._2).mapValues(_.map(_._1))\n    .mapValues(xs => (1 to 3).map( i => xs.count(_ == i-1)).toList)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "<table>\n<tr><td></td><td>#0</td><td>#1</td><td>#2</td></tr>\n{ for (popu <- confMat) yield\n  <tr><td>{popu._1}</td> { for (cnt <- popu._2) yield \n    <td>{cnt}</td>\n  }\n  </tr>\n}\n</table>",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "## PCA"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "import org.apache.spark.mllib.linalg.Matrix\nimport org.apache.spark.mllib.linalg.distributed.RowMatrix",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val trainingMat: RowMatrix = new RowMatrix(trainingDists.map(_.features))\nval testMat: RowMatrix = new RowMatrix(testDists.map(_.features))\n\n",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "// Compute the top 10 principal components.\nval pc: Matrix = trainingMat.computePrincipalComponents(10) // Principal components are stored in a local dense matrix.\n",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "// Project the rows to the linear space spanned by the top 10 principal components.\nval projected: RowMatrix = trainingMat.multiply(pc)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val projTraining = projected.rows.zip(trainingDists).map{tup =>\n                                     new LabeledPoint(tup._2.label, tup._1)\n                                     }\nval projTest = testMat.multiply(pc).rows.zip(testDists).map{tup =>\n                                     new LabeledPoint(tup._2.label, tup._1)\n                                     }",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "projTest.map{ lp => \n              val arr = lp.features.toArray\n              (arr(0), arr(1))\n                }.collect.toList",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "",
    "outputs" : [ ]
  } ],
  "nbformat" : 4
}